package com.aydx.minirpc.core.client;

import com.aydx.minirpc.core.common.ByteArrayUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;
import org.apache.commons.pool2.impl.GenericObjectPool;

import static io.netty.util.ReferenceCountUtil.release;

@ChannelHandler.Sharable
public class NettyTcpClient extends ChannelInboundHandlerAdapter {
    /// 通过nio方式来接收连接和处理连接
    private EventLoopGroup group ;
    private Bootstrap bootstrap;
    private AttributeKey<NettyResoponseFuture> nettyResoponseFutureAttributeKey=AttributeKey.valueOf("NettyResoponseFuture");
    private ChannelPoolFactory<Channel> channelPoolFactory;
    private GenericObjectPool<Channel> channelPool;
    private String ip;
    private Integer port;

    private NettyTcpClient() {
    }
    public static NettyTcpClient build(String ip,Integer port,Integer groupThreadNum){
        NettyTcpClient client=new NettyTcpClient(groupThreadNum);
        client.setIp(ip);
        client.setPort(port);
        return client;
    }
    private NettyTcpClient(Integer groupThreadNum){
        groupThreadNum=groupThreadNum!=null?groupThreadNum:0;
        //初始化group
        group=new NioEventLoopGroup();
        bootstrap=new Bootstrap();
        //
        bootstrap.group(group);
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.handler(this);
        //
        channelPoolFactory=new ChannelPoolFactory(this);
        channelPool = new GenericObjectPool<Channel>(this.channelPoolFactory);
    }
    /**
     * 发起请求
     */
    public NettyResoponseFuture call(byte[] message) throws Exception {
        byte[] data= ByteArrayUtils.arraycat(message,"{mrpc-end}".getBytes());
        NettyResoponseFuture nettyResoponseFuture=new NettyResoponseFuture();
        Channel channel=this.channelPool.borrowObject();
        channel.attr(nettyResoponseFutureAttributeKey).set(nettyResoponseFuture);
        //
        ByteBuf time = channel.alloc().buffer(data.length); // (2)
        time.writeBytes(data);
        channel.writeAndFlush(time);
        return nettyResoponseFuture;
    }

    /**
     * 关闭
     */
    public void close() throws InterruptedException {
        this.channelPool.close();
        group.shutdownGracefully();
    }
    /**
     * 收到消息的回调
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public synchronized void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            ByteBuf buf = (ByteBuf) msg;
            if (buf.readableBytes() <= 0) {
                return;
            }
            byte[] bs = new byte[buf.readableBytes()];
            buf.readBytes(bs);
            //
            ctx.channel().attr(nettyResoponseFutureAttributeKey).get().write(bs);
            //
        } catch (Exception e){
            e.printStackTrace();
        }finally {
            release(msg);
            //
            this.channelPool.returnObject(ctx.channel());

        }
    }
    public Channel createChannel() throws InterruptedException {
        System.out.println("启动一个连接");
        Channel channel=this.bootstrap.connect(this.ip,this.port).sync().channel();
        System.out.println("启动一个连接完成");
        return channel;
    }
    public void destoryChannel(Channel channel) throws InterruptedException {
        System.out.println("关闭一个连接");
        channel.closeFuture();
        System.out.println("关闭一个连接完成");
    }

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public Integer getPort() {
        return port;
    }

    public void setPort(Integer port) {
        this.port = port;
    }
}
